package com.codejiwei.flink.ck;

import com.codejiwei.flink.entity.TransientSink;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

import java.lang.reflect.Field;
import java.sql.PreparedStatement;
import java.sql.SQLException;

/**
 * author: codejiwei
 * date: 2023/9/7
 * desc:
 **/
public class ClickHouseUtil {

    public static <T> SinkFunction getJdbcSink(String sql)  {
        return JdbcSink.<T>sink(
                //要执行的SQL语句
                sql,
                //执行写入操作，就是将当前流中的对象属性赋值给SQL的占位符 insert into visitor_stats_2021 value(?,?,?,?,?,?,?,?,?,?,?,?)
                new JdbcStatementBuilder<T>() {
                    //obj 就是流中的一条数据对象
                    @Override
                    public void accept(PreparedStatement ps, T obj) throws SQLException {
                        //获取当前类中的 所有的属性
                        Field[] fields = obj.getClass().getDeclaredFields();
                        //跳过的属性计数
                        int skipOffset = 0;
                        for (int i = 0; i < fields.length; i++) {
                            Field field = fields[i];


                            //通过属性对象获取属性上是否有@TransientSink注解
                            TransientSink transientSink = field.getAnnotation(TransientSink.class);
                            //如果transientSink不为空，那么说明属性上有@TransientSink标记，那么在给?占位符赋值的时候，应该跳过当前属性
                            if (transientSink != null) {
                                skipOffset++;
                                continue;
                            }


                            //设置私有属性的可访问性
                            field.setAccessible(true);

                            try {
                                //获取属性值
                                Object o = field.get(obj);
                                ps.setObject(i + 1 - skipOffset, o);
                            } catch (IllegalAccessException e) {
                                e.printStackTrace();
                            }

                        }
                    }
                },
                //构建者设计模式，创建JdbcExecutionOptions对象，给batchSize属性赋值，执行执行批次大小:这里设置的是5个一个批次
                new JdbcExecutionOptions.Builder()
                        .withBatchSize(5)
                        .withBatchIntervalMs(1000)
                        .withMaxRetries(3)
                        .build(),
                //构建者设计模式，JdbcConnectionOptions，给连接相关的属性进行赋值
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl("jdbc:clickhouse://hadoop102:8123/default")
                        .withDriverName("ru.yandex.clickhouse.ClickHouseDriver")
                        .build()
        );
    }
}
